package com.tomato.dynamic.threadpool.core.thread;

import com.tomato.dynamic.threadpool.core.manage.AlarmManager;
import com.tomato.dynamic.threadpool.core.reject.RejectHandlerGetter;
import com.tomato.dynamic.threadpool.core.support.DynamicExecutorSupport;
import com.tomato.dynamic.threadpool.core.support.DynamicThreadPoolRunnable;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.tomato.dynamic.threadpool.common.enums.NotifyTypeEnum.QUEUE_TIMEOUT;
import static com.tomato.dynamic.threadpool.common.enums.NotifyTypeEnum.RUN_TIMEOUT;

/**
 * 动态线程池
 *
 * @author lizhifu
 * @date 2022/4/8
 */
@Slf4j
public class DynamicThreadPoolExecutor extends DynamicExecutorSupport {
    /**
     * 记录拒绝次数
     */
    private final AtomicInteger rejectCount = new AtomicInteger(0);
    /**
     * 记录执行超时此时
     */
    private final AtomicInteger runTimeoutCount = new AtomicInteger();

    /**
     * 记录等待超时次数
     */
    private final AtomicInteger queueTimeoutCount = new AtomicInteger();
    /**
     * 任务执行超时阈值
     */
    private long runTimeoutThreshold = 0;

    /**
     * 任务在队列等待超时阈值
     */
    private long queueTimeoutThreshold = 0;
    /**
     * If pre start all core threads.
     */
    private boolean preStartAllCoreThreads;
    /**
     * 拒绝策略
     */
    @Getter
    @Setter
    private RejectedExecutionHandler redundancyHandler;

    public DynamicThreadPoolExecutor(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory,
                                     RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

        RejectedExecutionHandler rejectedExecutionHandler = RejectHandlerGetter.getProxy(handler);
        setRejectedExecutionHandler(rejectedExecutionHandler);

        redundancyHandler = handler;

        if (preStartAllCoreThreads) {
            prestartAllCoreThreads();
        }
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if (r instanceof DynamicThreadPoolRunnable) {
            log.info("{} 开始执行任务 beforeExecute:{}",threadPoolName, r);
            DynamicThreadPoolRunnable dynamicThreadPoolRunnable = (DynamicThreadPoolRunnable) r;
            long currTime = System.currentTimeMillis();

            if(runTimeoutThreshold > 0) {
                dynamicThreadPoolRunnable.setStartTime(currTime);
            }
            if (queueTimeoutThreshold > 0) {
                long waitTime = currTime - dynamicThreadPoolRunnable.getSubmitTime();
                if (waitTime > queueTimeoutThreshold) {
                    queueTimeoutCount.incrementAndGet();
                    AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
                }
            }
        }
        super.beforeExecute(t, r);
    }

    /**
     * 包装 Runnable 对象
     * @param command
     */
    @Override
    public void execute(Runnable command) {
        if (runTimeoutThreshold > 0 || queueTimeoutThreshold > 0) {
            command = new DynamicThreadPoolRunnable(command);
        }
        super.execute(command);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if (r instanceof DynamicThreadPoolRunnable) {
            DynamicThreadPoolRunnable dynamicThreadPoolRunnable = (DynamicThreadPoolRunnable) r;
            if (runTimeoutThreshold > 0) {
                long runTime = System.currentTimeMillis() - dynamicThreadPoolRunnable.getStartTime();
                if (runTime > runTimeoutThreshold) {
                    log.info("{} 执行任务超时 afterExecute:{},runTime:{}",threadPoolName, r,runTime);
                    runTimeoutCount.incrementAndGet();
                    AlarmManager.doAlarm(this, RUN_TIMEOUT);
                }
            }
        }
        super.afterExecute(r, t);
    }
    /**
     * 拒绝次数
     * @param count
     */
    public void incRejectCount(int count) {
        rejectCount.addAndGet(count);
    }

    /**
     * 拒绝次数
     * @return
     */
    public int getRejectCount() {
        return rejectCount.get();
    }

    public int getRunTimeoutCount() {
        return runTimeoutCount.get();
    }

    public int getQueueTimeoutCount() {
        return queueTimeoutCount.get();
    }

    /**
     * 允许通过反射赋值
     * 是否允许核心线程超时
     * @param allowCoreThreadTimeOut allowCoreThreadTimeOut
     */
    public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        log.info("{} 是否允许核心线程超时 setAllowCoreThreadTimeOut:{}",threadPoolName, allowCoreThreadTimeOut);
        allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    }
    public long getRunTimeoutThreshold() {
        return runTimeoutThreshold;
    }

    public void setRunTimeoutThreshold(long runTimeoutThreshold) {
        this.runTimeoutThreshold = runTimeoutThreshold;
    }

    public long getQueueTimeoutThreshold() {
        return queueTimeoutThreshold;
    }

    public void setQueueTimeoutThreshold(long queueTimeoutThreshold) {
        this.queueTimeoutThreshold = queueTimeoutThreshold;
    }

    public void setPreStartAllCoreThreads(boolean preStartAllCoreThreads) {
        this.preStartAllCoreThreads = preStartAllCoreThreads;
    }
}
